- 
                Notifications
    You must be signed in to change notification settings 
- Fork 34
Add Sample-level Logging API #309
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hey, thanks! Did a first pass. Just a heads up: my PR will add a considerable amount of conflicts. Might be better to rebase before continuing.
        
          
                src/forge/observability/metrics.py
              
                Outdated
          
        
      | record_metric(key, sample, Reduce.SAMPLE) | ||
|  | ||
|  | ||
| def reduce_metrics_states( | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems that the only change is checking if Reduce.SAMPLE.value, but the core stays the same. I dont think that here is the correct place to put this abstraction, otherwise, what if the user wants to add another "if this", "if that".
Please take a look at this PR. I introduced dataclass Metric that also holds the reduction type.
On later steps, users can just iterate over metrics and check, if Reduce.SAMPLE.value, do this, otherwise, do that.
TDLR: lets keep the code here as it was and do the if/else elsewhere
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. I have rebased this PR to #303 , reverted the changes for reduce_metrics_states, and added the if-else checks in MetricCollector.flush
        
          
                src/forge/observability/metrics.py
              
                Outdated
          
        
      | for key, rows in samples.items(): | ||
| logger.info(f"[{key}] ({len(rows)} samples)") | ||
| for sample in rows: | ||
| pretty = json.dumps(sample, indent=2, ensure_ascii=False) | ||
| logger.info(pretty) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
whats a key here? Is it a whole table? Perhaps we could rename to:
for table_name, table_rows in samples.items()?
I am fine if you want to push back, just didnt seem 100% obvious to me at first sight. Rows remind me of tables, so maybe we should try to stay close to wandb nomenclature and rename everything SAMPLE -> TABLE?
Also, should we just do
json.dumps(rows) instead of dumping one sample at a time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The key here is rollout/sample. And you are right, we can make them table_name, table_rows to be more readable.
And this indeed can be simplified with json.dumps(rows). Done!
        
          
                src/forge/observability/metrics.py
              
                Outdated
          
        
      | table = wandb.Table(columns=columns) | ||
|  | ||
| for s in rows: | ||
| values = [s.get(c) for c in columns] | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what happens if c is not present? does it return None or does it break?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be None. I added a comment here to clarify.
| else: | ||
| logger.debug(f"WandbBackend: No run started, skipping log for {self.name}") | ||
|  | ||
| async def log_samples(self, samples: Dict[str, List[dict]], step: int) -> None: | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
following my previous comment, maybe we should keep wandb nomenclature and have it be log_table
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. W&B uses the term table, but since this method is part of the generic LoggerBackend interface(and the console backend doesn’t actually log tables):
class LoggerBackend(ABC):
    """Abstract logger_backend for metric logging, e.g. wandb, jsonl, etc."""
...
    async def log_samples(self, samples: Dict[str, List[dict]], step: int) -> None:
        passLet's keep the name log_samples() for consistency across backends?
        
          
                src/forge/observability/metrics.py
              
                Outdated
          
        
      | """ | ||
|  | ||
| def __init__( | ||
| self, reduction: Reduce, filter: TopBottomKFilter | None = TopBottomKFilter() | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is not configurable, i think we should hardcode this filter logic in the SampleAccumulator.
We could drop self.samples here (and the if/else filter checks). I also dont think we need 'self._counter' logic.
If the user wants to change the logic, i am thinking that its more convenient for them to just create a new SampleAccumulatorV2. They have no way to pass down from the config the arg filter to this accumulator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That’s a fair point. But I’d like to ask whether we envision making this configurable in the near future. If the answer is a solid yes, then I’d still prefer to keep the filter logic separate for now. It’s much easier (and more natural) for users to define a lightweight filter class than to implement an entirely new SampleAccumulatorV2.
Merging the logic into SampleAccumulator would make it harder to evolve later. We’d eventually need to split them apart again once customization becomes necessary.
Also, from a practical standpoint, the current design doesn’t introduce meaningful overhead: TopBottomKFilter.filter_append() always returns False, so we’re not storing samples twice. The filter only materializes the top/bottom-k subset at flush time.
Given that, I’d suggest keeping them separate for now to preserve flexibility for near-term extensions (the top/bottom-k filter may already be too limited for internal debugging needs).
But I agree that since we are hardcoding it, we don't need to make it over complicated. How about we do this and not make filter a parameter:
class SampleAccumulator(MetricAccumulator):
    def __init__(self, reduction: Reduce):
        super().__init__(reduction)
        self.samples: List[Dict[str, Any]] = []
        self.filter = TopBottomKFilter()| input_ids[i, :max_req_tokens] = episode.request_tensor | ||
| input_ids[i, max_req_tokens:] = episode.response_tensor | ||
| episode.reward = await reward_actor.evaluate_response.route( | ||
| episode.reward_breakdown = await reward_actor.evaluate_response.route( | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i like this, but i think it can be a bit dangerous. We dont have a dataclass that says the fields that it will hold. You would also need to make sure that the other actors are aware of this change. I am thinking that maybe we should keep episode.reward: float and add an extra optional field episode.reward_breakdown: dict[float]. Wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exactly! it comes with two field reward and reward_breakdown. If you look at the line below it:
episode.reward = episode.reward_breakdown["reward"]| @felipemello1 do i review #303 before this? | 
| 
 @allenwang28 I have rebased this on #303. So let's land 303 first? | 
| yes, lets land 303 first. I am debugging one thing about wandb. I think i found a solution. Then we should be able to merge it | 
| Codecov Report❌ Patch coverage is  Additional details and impacted files@@           Coverage Diff           @@
##             main     #309   +/-   ##
=======================================
  Coverage        ?   64.88%           
=======================================
  Files           ?       80           
  Lines           ?     7982           
  Branches        ?        0           
=======================================
  Hits            ?     5179           
  Misses          ?     2803           
  Partials        ?        0           ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
 | 
This PR introduces structured sample-level logging to complement existing scalar metrics, allowing users to inspect concrete prompt–response–reward examples during RL training.
More discussion: #301
The current implementation logs 2 samples: one top (highest reward) and one bottom (lowest reward) for each step. Supporting customized sampling strategy is out of scope of this PR. For now, this can be achieved by changing the filter for
SampleAccumulator. For example:Summary of Changes
SampleAccumulatorto support logging structured dict samples (e.g., per-episode data) viarecord_episode_sampleAPI withReduce.SAMPLE.TopBottomKFilterfor selecting top/bottom samples based on reward (heap-based, O(log k) per append).wandb.Tablefor live sample inspection.Logged Fields
Each logged sample includes:
{ "episode_id": episode.episode_id, "policy_version": episode.policy_version, "prompt": episode.request, "response": episode.response, "target": str(episode.target), **(episode.reward_breakdown or {}), # per-reward-fn breakdown + avg reward "advantage": episode.advantage, "ref_logprobs": float( episode.ref_logprobs.mean().item() if episode.ref_logprobs is not None else None ), "request_len": episode.request_len, "response_len": episode.response_len, "pad_id": episode.pad_id, }Tests:
python apps/toy_rl/toy_metrics/main.pypython -m apps.grpo.main --config apps/grpo/qwen3_1_7b.yamlfor modes:logging_mode=global_reducelogging_mode=per_rank_reducewithper_rank_share_run=False. The table is logged in the runController_xxxlogging_mode=per_rank_reducewithper_rank_share_run=Truelogging_mode=per_rank_no_reducewithper_rank_share_run=False. Because there's no reduce, so we are logging more than 2 samples each step.logging_mode=per_rank_no_reducewithper_rank_share_run=True(not working)Backend: wandb
Backend: Console:
========== SAMPLE LOGS STEP 1 ========== [rollout/sample] (2 samples) { "episode_id": "ea2f341b-bf7a-4647-9913-c5f2c8bb525f", "policy_version": 0, "prompt": "<|im_start|>system\n\n Put all your scratchpad work between <think> and </think> tags.\n Your final answer should be between <answer> and </answer> tags otherwise it will not be scored.\n <|im_end|>\n<|im_start|>user\nJames buys $3000 worth of stuff from Amazon. He has to return a TV that cost $700 and a bike that cost $500. He also sells another bike that cost 20% more than the bike he returned for 80% of what he bought it for. He then buys a toaster for $100. How much is he out of pocket for everything?<|im_end|>\n<|im_start|>assistant\n", "response": "<think>\nOkay, let me try to figure out how much James is out of pocket for everything. Hmm, let's start by breaking down the problem step by step.\n\nFirst, James buys $3000 worth of stuff from Amazon. So, his initial total cost is $3000. But then he has to return a TV that cost $700 and a bike that cost $500. Wait, so when he returns those items, he's getting them back, right? So maybe that means he's losing the value of those items? But I need to check if that's how it's calculated. \n\nBut the question is about how much he's out of pocket. So, when he returns the TV and bike, he's probably losing the amount he paid for them. So, the TV was $700, bike was $500. So, total money he gave back is 700 + 500 = $1200. But he had $3000 initially. So, if he returns $1200, he's out of pocket for those items? Wait, but maybe he has to pay for them back? Or does returning them mean he's just not keeping them? \n\nWait, maybe I need to think in terms of net cash flow. Let me think. When he buys stuff, he spends money. When he returns stuff, he gives money back, so that's like a deduction from his total expenditure. \n\nSo, initial purchase: $3000. Then he returns $700 (TV) and $500 (bike). So, total returned is $1200. Therefore, his net expenditure so far is $3000 - $1200 = $1800. But then he sells another bike that cost 20% more than the bike he returned. Wait, the bike he returned was $500, so the bike he sold is 20% more than that. Let me calculate that. 20% of 500 is 100, so the bike he sold cost 500 + 100 = $600. But he sells it for 80% of what he bought it for. So, he bought it for $600, so 80% of that is 0.8 * 600 = $480. So", "target": "2020", "MathReward": 0.0, "ThinkingReward": 0.2, "reward": 0.1, "advantage": -7.449986878782511e-05, "ref_logprobs": -1.359375, "request_len": 512, "response_len": 512, "pad_id": 151643 } { "episode_id": "b0a1d37d-c3a6-403a-8e9f-8ae510b6702a", "policy_version": 0, "prompt": "<|im_start|>system\n\n Put all your scratchpad work between <think> and </think> tags.\n Your final answer should be between <answer> and </answer> tags otherwise it will not be scored.\n <|im_end|>\n<|im_start|>user\nJohn uses 5 liters of fuel per km to travel. How many liters of fuel should John plan to use if he plans to travel on two trips of 30 km and 20 km?<|im_end|>\n<|im_start|>assistant\n", "response": "<think>\nOkay, let's see. John is traveling twice, first 30 km and then 20 km. He uses 5 liters per km. So I need to calculate the total fuel needed for both trips.\n\nFirst, I should figure out how much fuel he uses for each trip separately and then add them together. For the first trip, 30 km multiplied by 5 liters per km. Let me do that calculation: 30 times 5 is 150 liters. Then for the second trip, 20 km times 5 liters per km. 20 times 5 is 100 liters. Adding those two amounts together, 150 plus 100 equals 250 liters. So John should plan to use 250 liters of fuel in total. I think that's it. Let me double-check. 30 km trip uses 150 liters, 20 km uses 100. Total is indeed 250. Yep, that seems right.\n</think>\n\nJohn uses 5 liters of fuel per km. For the first trip of 30 km, he uses $30 \\times 5 = 150$ liters. For the second trip of 20 km, he uses $20 \\times 5 = 100$ liters. Total fuel required is $150 + 100 = 250$ liters.\n\n<answer>250</answer>", "target": "250", "MathReward": 1.0, "ThinkingReward": 1.0, "reward": 1.0, "advantage": 0.5398026704788208, "ref_logprobs": -4.90625, "request_len": 512, "response_len": 512, "pad_id": 151643 } ==============================================Notes